-
Notifications
You must be signed in to change notification settings - Fork 261
[FEATURE]: Add timeout to prevent long-running functions (Closes #658) #1241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[FEATURE]: Add timeout to prevent long-running functions (Closes #658) #1241
Conversation
src/builder/analyzer.rs
Outdated
| let behavior_version = executor.behavior_version(); | ||
| let timeout = executor.timeout() | ||
| .or(execution_options_timeout) | ||
| .or(Some(Duration::from_secs(300))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put the default value into a global const. And as a second thought, we may want to start from a larger value - use 1800 second for now (later after things more stable, I'll gradually reduce it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Added TIMEOUT_THRESHOLD and WARNING_THRESHOLD to both src/builder/analyzer.rs and src/execution/evaluator.rs
src/execution/evaluator.rs
Outdated
|
|
||
| let op_name_for_warning = op.name.clone(); | ||
| let op_kind_for_warning = op.op_kind.clone(); | ||
| let warn_handle = tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need tokio::spawn.
tokio::select! is sufficient (example)
src/execution/evaluator.rs
Outdated
| eprintln!( | ||
| "WARNING: Function '{}' ({}) is taking longer than 30s", | ||
| op_kind_for_warning, op_name_for_warning | ||
| ); | ||
| warn!( | ||
| "Function '{}' ({}) is taking longer than 30s", | ||
| op_kind_for_warning, op_name_for_warning | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only need one. Let's keep warn! and get rid of eprintln!
|
|
||
| // Assemble input values | ||
| let input_values: Vec<value::Value> = | ||
| assemble_input_values(&op.input.fields, scoped_entries) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| // Create field_values vector for all fields in the merged schema | ||
| let mut field_values = op | ||
| .field_index_mapping | ||
| .iter() | ||
| .map(|idx| { | ||
| idx.map_or(value::Value::Null, |input_idx| { | ||
| input_values[input_idx].clone() | ||
| }) | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
|
|
||
| // Handle auto_uuid_field (assumed to be at position 0 for efficiency) | ||
| if op.has_auto_uuid_field { | ||
| if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx { | ||
| let uuid = memory.next_uuid( | ||
| op.fingerprinter | ||
| .clone() | ||
| .with( | ||
| &field_values | ||
| .iter() | ||
| .enumerate() | ||
| .filter(|(i, _)| *i != uuid_idx) | ||
| .map(|(_, v)| v) | ||
| .collect::<Vec<_>>(), | ||
| )? | ||
| .into_fingerprint(), | ||
| )?; | ||
| field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid)); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These code should be kept (merge mistake?). Please bring back.
|
|
||
| pub struct RetryOptions { | ||
| pub retry_timeout: Option<Duration>, | ||
| pub per_call_timeout: Option<Duration>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per_call_timeout is not used. Why do we need this?
| class _ExecutionOptions: | ||
| max_inflight_rows: int | None = None | ||
| max_inflight_bytes: int | None = None | ||
| timeout: int | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use datetime.timedelta | None here: prefer stronger type
| batching: bool = False | ||
| max_batch_size: int | None = None | ||
| behavior_version: int | None = None | ||
| timeout: int | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, let's use datetime.timedelta | None
| }; | ||
| use futures::future::{BoxFuture, try_join3}; | ||
| use futures::{FutureExt, future::try_join_all}; | ||
| use tokio::time::Duration; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is an alias. Let's directly use std::time::Duration.
| use futures::{FutureExt, future::try_join_all}; | ||
| use tokio::time::Duration; | ||
|
|
||
| const TIMEOUT_THRESHOLD: u64 = 1800; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use Duration type here.
Summary:
Implements a 30-second warning and 300-second abort function mechanism as default for functions in
src/execution/evaluator.rs. It also allows per-function overriding by specifying a timeout in decorator arguments, similar to the caching method.Fixes #658
Examples:
examples/manuals_llm_extraction/main.pywe can add@cocoindex.op.executor_class(timeout=15)to override current 300second timeout as shownChecklist: